AWS CLIを使用してKinesis Data StreamsでLambda関数を使用してみた。
概要
Kinesis Data Streams を使用して、大規模なデータを収集し、処理することができます。この記事では、AWS CLIを使用してKinesis Data StreamsでLambda関数を使用してみました。ここでLambda関数はKinesis Data Streamのコンシューマーとして機能します。Lambda関数は、Kinesis Data Streamsからのイベントを消費します。データレコードがストリームに書き込まれると、Lambda はストリームをポーリングし、ストリームで新しいレコードを検出すると Lambda 関数を呼び出します。
やってみた
IAMロールの作成
- Lambda関数にKinesis Data StreamsとCloudWatchへのアクセスを許可する実行ロールをしておきます。
- 以下のポリシーを含むjsonファイルを作成しておきます。
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "lambda.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }
- 次のコマンドを使用して、上記のポリシーでIAMロールを作成しておきます。
//create an IAM role aws iam create-role --role-name "lambda-kinesis-role" --assume-role-policy-document file://assumeRole.json //Output { "Role": { "Path": "/", "RoleName": "lambda-kinesis-role", "RoleId": "............", "Arn": "arn:aws:iam::xxxxxxxxxxxx:role/lambda-kinesis-role", "CreateDate": "2022-04-28T07:12:13+00:00", "AssumeRolePolicyDocument": { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "lambda.amazonaws.com" }, "Action": "sts:AssumeRole" } ] } } }
- [AWSLambdaKinesisExecutionRole]ポリシーをIAMロールにアタッチしておきます。
aws iam attach-role-policy --role-name lambda-kinesis-role --policy-arn 'arn:aws:iam::aws:policy/service-role/AWSLambdaKinesisExecutionRole'
Lambda 関数の作成
- 次のコードでindex.jsファイルを作成しておきます。
exports.handler = function(event, context) { event.Records.forEach(function(record) { var data = Buffer.from(record.kinesis.data, 'base64').toString('ascii'); console.log('Decoded data:', data); }); };
- デプロイパッケージを作成しておきます。
zip function.zip index.js
- 次のコマンドを使用してLambda関数を作成しておきます。
//create Lambda Function aws lambda create-function --function-name Lambda-Kinesis \ --zip-file fileb://function.zip --handler index.handler --runtime nodejs14.x \ --role arn:aws:iam::xxxxxxxxxxxx:role/lambda-kinesis-role \ --region us-east-1 //Output { "FunctionName": "Lambda-Kinesis", "FunctionArn": "arn:aws:lambda:us-east-1:xxxxxxxxxxxx:function:Lambda-Kinesis", "Runtime": "nodejs14.x", "Role": "arn:aws:iam::xxxxxxxxxxxx:role/lambda-kinesis-role", "Handler": "index.handler", "CodeSize": 324, "Description": "", "Timeout": 3, "MemorySize": 128, "LastModified": "2022-04-28T07:32:51.018+0000", "CodeSha256": "..........", "Version": "$LATEST", "TracingConfig": { "Mode": "PassThrough" }, "RevisionId": ".............", "State": "Pending", "StateReason": "The function is being created.", "StateReasonCode": "Creating", "PackageType": "Zip", "Architectures": [ "x86_64" ], "EphemeralStorage": { "Size": 512 } }
Kinesis Data Streamの作成
- 次のコマンドを使用してData Streamを作成しておきます。
aws kinesis create-stream --stream-name lambda-kinesis-stream --shard-count 1 --region us-east-1
- [describe-stream]コマンドを実行して、ストリームの詳細を取得します。
//Describe the Data Stream aws kinesis describe-stream --stream-name lambda-kinesis-stream --region us-east-1 //Output { "StreamDescription": { "Shards": [ { "ShardId": "shardId-000000000000", "HashKeyRange": { "StartingHashKey": "0", "EndingHashKey": ".............." }, "SequenceNumberRange": { "StartingSequenceNumber": "............." } } ], "StreamARN": "arn:aws:kinesis:us-east-1:xxxxxxxxxxx:stream/lambda-kinesis-stream", "StreamName": "lambda-kinesis-stream", "StreamStatus": "ACTIVE", "RetentionPeriodHours": 24, "EnhancedMonitoring": [ { "ShardLevelMetrics": [] } ], "EncryptionType": "NONE", "KeyId": null, "StreamCreationTimestamp": "2022-04-28T13:04:46+05:30" } }
Event Source Mappingの作成
-
Lambda でイベントソースを追加するために、関数名とデータストリームArnを使用して次のコマンドを実行しておきます。
aws lambda create-event-source-mapping --function-name Lambda-Kinesis \ --event-source arn:aws:kinesis:us-east-1:xxxxxxxxxxx:stream/lambda-kinesis-stream \ --batch-size 100 --starting-position LATEST --region us-east-1
テストする
- イベントソースマッピングをテストするには、イベントレコードを Kinesis ストリームに追加しておきます。次のコマンドを使用して、データレコードをデータストリームに追加しておきます。
//add data record to kinesis data stream aws kinesis put-record --stream-name lambda-kinesis-stream --partition-key 1 \ --data "Hello, Welcome" --cli-binary-format raw-in-base64-out --region us-east-1 //Output { "ShardId": "shardId-000000000000", "SequenceNumber": "11111122222223333333344444444555555556666667899754" }
- データレコードが追加されると、Lambda関数が呼び出されます。この関数は、レコードからデータをデコードしてログに記録します。CloudWatchログでアウトプットを見ることができます。
まとめ
AWS CLIを使用してKinesis Data StreamsでLambda関数を使用してみました。他のコンシューマーとKinesis Data Streamsを試すことができます。
Reference :